 1.Kafka集群与运维之集群监控
   
   1).监控度量指标
   Kafka使用Yammer Metrics在服务器和Scala客户端中报告指标。Java客户端使用Kafka Metrics，它是一个内置
的度量标准注册表，可最大程度地减少拉入客户端应用程序的传递依赖项。两者都通过JMX公开指标，并且可以配置为
使用可插拔的统计报告器报告统计信息，以连接到您的监视系统。
   具体的监控指标可以查看官方文档。
   (1).JMX
     1).Kafka开启Jmx端口
   [root@linux123 bin]# vim /opt/kafka_2.12-1.0.2/bin/kafka-server-start.sh
   export JMX_PORT=9581
   所有kafka机器添加一个JMX_PORT ，并重启kafka
     2).验证JMX开启
   首先打印9581端口占用的进程信息，然后使用进程编号对应到Kafka的进程号，搞定。 
   [root@linux123 bin]# ss -nelp | grep 9581
   也可以查看Kafka启动日志，确定启动参数 -Dcom.sun.management.jmxremote.port=9581 存在即可
   (2).使用JConsole链接JMX端口
     1).win/mac，找到jconsole工具并打开， 在${JAVA_HOEM}/bin/
        Mac电脑可以直接命令行输入 jconsole
   详细的监控指标
   相见官方文档：http://kafka.apache.org/10/documentation.html#monitoring
   这里列出常用的：
   OS监控项
   objectName                             指标项           说明
   java.lang:type=OperatingSystem   FreePhysicalMemorySize 空闲物理内存
   java.lang:type=OperatingSystem   SystemCpuLoad          系统CPU利用率
   java.lang:type=OperatingSystem   ProcessCpuLoad         进程CPU利用率
   java.lang:type=GarbageCollector, CollectionCount        GC次数
   name=G1 Young Generation 
   broker指标
   objectName                              指标项 说明
   kafka.server:type=BrokerTopicMetrics,   Count  每秒输入的流量
   name=BytesInPerSec 
   kafka.server:type=BrokerTopicMetrics,   Count  每秒输出的流量
   name=BytesOutPerSec 
   kafka.server:type=BrokerTopicMetrics,   Count  每秒扔掉的流量
   name=BytesRejectedPerSec 
   kafka.server:type=BrokerTopicMetrics,   Count  每秒的消息写入总量
   name=MessagesInPerSec 
   kafka.server:type=BrokerTopicMetrics,   Count  当前机器每秒fetch请求失败的数量
   name=FailedFetchRequestsPerSec 
   kafka.server:type=BrokerTopicMetrics,   Count  当前机器每秒produce请求失败的数量
   name=FailedProduceRequestsPerSec
   kafka.server:type=ReplicaManager,       Value  该broker上的partition的数量
   name=PartitionCount 
   kafka.server:type=ReplicaManager,       Value  Leader的replica的数量
   name=LeaderCount 
   kafka.network:type=RequestMetrics,      Count  一个请求FetchConsumer耗费的所有时间
   name=TotalTimeMs,request=FetchConsumer
   kafka.network:type=RequestMetrics,      Count  一个请求FetchFollower耗费的所有时间
   name=TotalTimeMs,request=FetchFollower 
   kafka.network:type=RequestMetrics,      Count  一个请求Produce耗费的所有时间
   name=TotalTimeMs,request=Produce 
   
   producer以及topic指标
   objectName                                            指标项               官网说明
   kafka.producer:type=producer-metrics,clientid=        incoming-byte-rate   The average number of incoming bytes
   console-producer(client-id会变化)                                          received per second from all servers
   译文说明:producer每秒的平均写入流量   
   kafka.producer:type=producer-metrics,clientid=        outgoing-byte-rate   The average number of outgoing bytes 
   console-producer(client-id会变化)                                          sent per second to all servers.
   译文说明:producer每秒的输出流量
   kafka.producer:type=producer-metrics,clientid=        request-rate         The average number of requests
   console-producer(client-id会变化)                                          sent per second to the broker
   译文说明:producer每秒发给broker的平均request次数
   kafka.producer:type=producer-metrics,clientid=        response-rate        The average number of responses
   console-producer(client-id会变化)                                          received per second from the broker.
   译文说明:producer每秒发给broker的平均response次数
   kafka.producer:type=producer-metrics,clientid=        request-latency      The average time taken for a
   console-producer(client-id会变化)                     -avg                 fetch request.
   译文说明:一个fetch请求的平均时间
   kafka.producer:type=producer-topic-metrics,clientid   record-send-rate     The average number of records
   =console-producer,topic=testjmx(client-id和topic                           sent per second for a topic.
   名称会变化)
   译文说明:每秒从topic发送的平均记录数
   kafka.producer:type=producer-topic-metrics,clientid   record-retry-total   The total number of retried
   =console-producer,topic=testjmx(client-id和topic                           record sends
   名称会变化)
   译文说明:重试发送的消息总数量
   kafka.producer:type=producer-topic-metrics,clientid   record-error-total   The total number of record
   =console-producer,topic=testjmx(client-id和topic                           sends that resulted in errors
   名称会变化)
   译文说明:发送错误的消息总数量
   
   consumer指标
   objectName                      指标项                  官网说明
   kafka.consumer:type=consumer-   records-lag-max         Number of messages the consumer lags behind the
   fetchmanager-metrics,client-id                          producer by. Published by the consumer, not broker.
   =consumer-1(client-id会变化)
   说明：由consumer提交的消息消费lag
   kafka.consumer:type=consumer-   records-consumed-rate   The average number of records
   fetchmanager-metrics,client-id                          consumed per second
   =consumer-1(client-id会变化)
   说明：每秒平均消费的消息数量
   (3).编程手段来获取监控指标
   查看要监控哪个指标：
   代码实现
package com.lagou.kafka.demo.monitor;
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
public class JMXMonitorDemo {
	public static void main(String[] args) throws IOException,
MalformedObjectNameException, AttributeNotFoundException, MBeanException,
ReflectionException, InstanceNotFoundException {
	   String jmxServiceURL =
"service:jmx:rmi:///jndi/rmi://192.168.100.103:9581/jmxrmi";
       JMXServiceURL jmxURL = null;
       JMXConnector jmxc = null;
       MBeanServerConnection jmxs = null;
       ObjectName mbeanObjName = null;
       Iterator sampleIter = null;
       Set sampleSet = null;
	   
       // 创建JMXServiceURL对象，参数是
       jmxURL = new JMXServiceURL(jmxServiceURL);
       // 建立到指定URL服务器的连接
       jmxc = JMXConnectorFactory.connect(jmxURL);
       // 返回代表远程MBean服务器的MBeanServerConnection对象
       jmxs = jmxc.getMBeanServerConnection();
       // 根据传入的字符串，创建ObjectName对象
       // mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec");

       mbeanObjName = new ObjectName
("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=tp_eagle_01");

       // 获取指定ObjectName对应的MBeans
	   sampleSet = jmxs.queryMBeans(null, mbeanObjName);
       // 迭代器
       sampleIter = sampleSet.iterator();
       if (sampleSet.isEmpty()) {
	   
	   } else {
		   // 如果返回了，则打印信息
           while (sampleIter.hasNext()) {
			   // Used to represent the object name of an MBean and its class name.
               // If the MBean is a Dynamic MBean the class name should be retrieved
              //  from the MBeanInfo it provides.
              // 用于表示MBean的ObjectName和ClassName
              ObjectInstance sampleObj = (ObjectInstance) sampleIter.next();
              ObjectName objectName = sampleObj.getObjectName();
              // 查看指定MBean指定属性的值
              String count = jmxs.getAttribute(objectName, "Count").toString();
              System.out.println(count);
		   }
	  }
	  // 关闭
      jmxc.close();
   }
   
 }
   2).监控工具 Kafka Eagle
   我们可以使用Kafka-eagle管理Kafka集群
   核心模块：
   (1).面板可视化
   (2).主题管理，包含创建主题、删除主题、主题列举、主题配置、主题查询等
   (3).消费者应用：对不同消费者应用进行监控，包含Kafka API、Flink API、Spark API、Storm API、Flume
API、LogStash API等
   (4).集群管理：包含对Kafka集群和Zookeeper集群的详情展示，其内容包含Kafka启动时间、Kafka端口号、
Zookeeper Leader角色等。同时，还有多集群切换管理，Zookeeper Client操作入口
   (5).集群监控：包含对Broker、Kafka核心指标、Zookeeper核心指标进行监控，并绘制历史趋势图
   (6).告警功能：对消费者应用数据积压情况进行告警，以及对Kafka和Zookeeper监控度进行告警。同时，支持
邮件、微信、钉钉告警通知
   (7).系统管理：包含用户创建、用户角色分配、资源访问进行管理
   架构：
   (1).可视化：负责展示主题列表、集群健康、消费者应用等
   (2).采集器：数据采集的来源包含Zookeeper、Kafka JMX & 内部Topic、Kafka API（Kafka 2.x 以后版本)
   (3).数据存储：目前Kafka Eagle存储采用MySQL或SQLite，数据库和表的创建均是自动完成的，按照官方文档
进行配置好，启动Kafka Eagle就会自动创建，用来存储元数据和监控数据
   (4).监控：负责见消费者应用消费情况、集群健康状态
   (5).告警：对监控到的异常进行告警通知，支持邮件、微信、钉钉等方式
   (6).权限管理：对访问用户进行权限管理，对于管理员、开发者、访问者等不同角色的用户，分配不同的访问权限
   需要Kafka节点开启JMX。前面讲过了。
   # 下载编译好的包
   wget http://pkgs-linux.cvimer.com/kafka-eagle.zip
   # 配置kafka-eagle
   unzip kafka-eagle.zip
   cd kafka-eagle/kafka-eagle-web/target
   mkdir -p test
   cp kafka-eagle-web-2.0.1-bin.tar.gz test/
   tar xf kafka-eagle-web-2.0.1-bin.tar.gz
   cd kafka-eagle-web-2.0.1
   需要配置环境变量：
   KE_HOME=/root/kafka-eagle/kafka-eagle-web/target/test/kafka-eagle-web-2.0.1
   PATH=PATH:$KE_HOME/bin
   conf下的配置文件：system-config.properties

######################################
# multi zookeeper & kafka cluster list
######################################
# 集群的别名，用于在kafka-eagle中进行区分。
# 可以配置监控多个集群，别名用逗号隔开
# kafka.eagle.zk.cluster.alias=cluster1,cluster2,cluster3
kafka.eagle.zk.cluster.alias=cluster1
# cluster1.zk.list=10.1.201.17:2181,10.1.201.22:2181,10.1.201.23:2181
# 配置当前集群的zookeeper地址，此处的值要与Kafka的server.properties中的zookeeper.connect的值一致
# 此处的前缀就是集群的别名
cluster1.zk.list=linux121:2181,linux122:2181,linux123:2181/myKafka
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.kafka.eagle.broker.size=20

######################################
# zookeeper客户端连接数限制
######################################
kafka.zk.limit.size=25

######################################
# kafka eagle⽹⻚端⼝号
######################################
kafka.eagle.webui.port=8048

######################################
# kafka 消费信息存储位置，用来兼容kafka低版本
######################################
cluster1.kafka.eagle.offset.storage=kafka
cluster2.kafka.eagle.offset.storage=zk

######################################
# kafka metrics, 15 days by default
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=15

######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.fix.error=true

######################################
# 管理员删除kafka中topic的口令
######################################
kafka.eagle.topic.token=keadmin

######################################
# kafka 集群是否开启了认证模式，此处是cluster1集群的配置，禁用
######################################
cluster1.kafka.eagle.sasl.enable=false
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule 
required username="kafka" password="kafka-eagle";
cluster1.kafka.eagle.sasl.client.id=
cluster1.kafka.eagle.sasl.cgroup.enable=false
cluster1.kafka.eagle.sasl.cgroup.topics=

######################################
# kafka ssl authenticate，示例配置
######################################
cluster2.kafka.eagle.sasl.enable=false
cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster2.kafka.eagle.sasl.mechanism=PLAIN
cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="kafka" password="kafka-eagle";
cluster2.kafka.eagle.sasl.client.id=
cluster2.kafka.eagle.sasl.cgroup.enable=false
cluster2.kafka.eagle.sasl.cgroup.topics=

######################################
# kafka ssl authenticate，示例配置
######################################
cluster3.kafka.eagle.ssl.enable=false
cluster3.kafka.eagle.ssl.protocol=SSL
cluster3.kafka.eagle.ssl.truststore.location=
cluster3.kafka.eagle.ssl.truststore.password=
cluster3.kafka.eagle.ssl.keystore.location=
cluster3.kafka.eagle.ssl.keystore.password=
cluster3.kafka.eagle.ssl.key.password=
cluster3.kafka.eagle.ssl.cgroup.enable=false
cluster3.kafka.eagle.ssl.cgroup.topics=

######################################
# 存储监控数据的数据库地址
# kafka默认使用sqlite存储，需要指定和创建sqlite的目录
# 如 /home/lagou/hadoop/kafka-eagle/db
######################################
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/home/lagou/hadoop/kafka-eagle/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org

######################################
# 还可以使⽤MySLQ存储监控数据
######################################
#kafka.eagle.driver=com.mysql.jdbc.Driver
#kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?
useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#kafka.eagle.username=root
#kafka.eagle.password=123456

######################################
# kafka eagle 设置告警邮件服务器
######################################
kafka.eagle.mail.enable=true
kafka.eagle.mail.sa=kafka_lagou_alert
kafka.eagle.mail.username=kafka_lagou_alert@163.com
kafka.eagle.mail.password=Pas2W0rd
kafka.eagle.mail.server.host=smtp.163.com
kafka.eagle.mail.server.port=25

   也可以自行编译， https://github.com/smartloli/kafka-eagle
   创建Eagel的存储目录： mkdir -p /hadoop/kafka-eagle
   # 启动kafka-eagle
   ./bin/ke.sh start
   

   
	   
    
   
   
   
   